package com.atguigu.flink.demo01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Arrays;
import java.util.Properties;

/**
 *
 * 无界流
 * @author admin
 * @date 2021/8/6
 */
public class UnboundedStreamByKafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // topic
        String topic="flink_kafka";

        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,topic);


        DataStreamSource<String> source = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<Tuple2<String, Integer>> fl = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING,Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = fl.keyBy(0).sum(1);

        sum.print("test");

        env.execute("kafka");

    }
}
